查看原文
其他

基于 Flink 构建 CEP 引擎的挑战和实践

The following article is from Ververica Author 韩鹏@奇安信

摘要:奇安信集团作为一家网络安全公司是如何基于 Flink 构建 CEP 引擎实时检测网络攻击?其中面临的挑战以及宝贵的实践经验有哪些?本文主要内容分为以下四个方面:
  1. 背景及现状

  2. 技术架构

  3. 产品及运维

  4. 未来发展与思考


重要:后台回复「实时数仓」可查看 Flink Forward Asia 大会视频。 


背景及现状


奇安信集团作为一家网络安全公司,专门为政府、企业,教育、金融等机构和组织提供企业级网络安全技术、产品和服务,奇安信的 NGSOC 产品的核心引擎是一个 CEP 引擎,用于实时检测网络攻击,其技术演进过程如下图所示。


  • 2015 年开始使用基于 Esper 的 CEP 方案,但是当时遇到了很多问题,其中最显著的是性能问题,因为 Esper 对于规则条目的支持数量不多,一般情况下超过几十条就会受到严重影响;

  • 2017 年奇安信的技术方案演进到了使用 C++ 实现的 Dolphin 1.0,其在单机上的性能表现大幅度提升;

  • 2018 年奇安信决定将技术方案全面转向基于 Flink 的 Sabre。


 

奇安信产品具体的应用场景是企业系统的安全检测和数据分析,其自下而上分为四个业务处理流程,分别是数据的采集、解析、处理和展示结果,这其中最核心的是第三层数据处理。该产品的用户主要是安全规则团队,其可以使用规则编辑器来对安全规则进行添加、删除、编辑和查找操作,并可批量启动/停用多个规则,同时可以将处于启动状态的有效规则统一发送给产品。

 

在数据规模方面,产品解决的不是一个或几个大型数据集群的问题,而是数以百计的中小型数据集群的运维问题。在 B2B 领域,由于产品是直接部署到客户方,很多客户使用的是内部隔离网,无法连接外网,且没有专门人员负责集群的运维,这种情况下哪怕一个小升级都会耗费大量时间。因此,产品更多关注该领域下数据集群可运维性问题的解决。



奇安信在最初计划使用 Flink 作为技术方案并进行调研的过程中,发现了其一系列的痛点问题。由于企业级硬件资源环境受限,规则集数量及种类不确定,使得 Flink 程序运行难以控制,并且现有的库“Flink SQL”和“Flink CEP”均不能满足其业务性能需求。具体的痛点如下:


1.不能进行语义优化、不便于动态更新规则


网络安全事件井喷式发生的今天,安全需求迅速扩展。为了能够在有限时间内对特定语义的快速支持,关联引擎的整体架构必须异常灵活,才能适应未来安全分析场景的各种需求,而基于开源关联引擎实现的产品会在激烈的需求变化时遇到很多问题。


2.状态监控 & 高可用支持不足


面向企业级的网络安全监测引擎具有一些特定需求,当前解决方案对此支持较差。


  • 比如,现实情况中客户对算子实例和 Taskmanager 概念较为模糊,真正关心的运行状态的基本单位是规则。Flink 监控页面显示的是算子实例及 Task manager 进程整体内存的运行状态,而在网络安全监控的业务场景中,对运行状态和资源的监控均需要细化到规则层面。

  • 其次,在算子层面,Flink 原生 Window 算子,没有较好的资源(CPU / 内存)保护机制,且存在大量重复告警,不符合网络安全监测领域的业务需求。

  • 再次,Flink 缺乏一些必要算子,例如不支持“不发生算子”。一个较为常见的应用场景,某条规则指定在较长时间内没收到某台服务器的系统日志,则认为此台服务器发生了异常,需要及时通知用户。


3.CEP 网络负载高、CPU 利用率低


和互联网企业内部使用的大型集群相比,奇安信面向的企业级应用集群规模较小,硬件资源受限,且客户的定制需求较多,导致安全监测的规则要求更严格,引擎发布成本较高。但是,现有的 Flink 开源解决方案,或者需要根据业务需求进行改造,或者性能较差,均不能较好地解决上述问题。


  • 首先,原生 Flink 只提供了函数式编程模式,即需要手动编写复合特定业务需求的固定程序代码,由此导致开发测试周期较长,不便于动态更新规则,可复用性较弱,且不能从全局语义层面进行优化,性能较差。

  • 其次,Flink-CEP 仅是一个受限的序列算子,在运行时需要将所有数据传输到 CEP 算子,然后在 CEP 算子中串行执行各个条件语句。这种汇集到单点的运行模式,较多的冗余数据需要执行条件匹配,由此产生了不必要的网络负载,而且降低了 CPU 利用率。

  • 再次,还存在一些非官方开源的轻量级 CEP 引擎,比如 Flink-siddhi,功能简单,不是一个完整的解决方案。


 

其他的痛点问题还包括不支持空值窗口出发、以及聚合不保存原始数据等。



为了解决上述问题,奇安信在 Flink 的基础上推出了一种全新的 CEP 引擎,  Sabre。其整体架构如下图所示,其中包含三大核心模块,左侧是配置端,中间是 Sabre-server,右侧是 Sabre 运行端。核心数据流存在两条主线,红线表示规则的提交、编译、发布和运行流程。绿线表示状态监控的生成、收集、统计和展示流程。如图所示,此架构与 Hive 极为相似,是一种通用的大数据 OLAP 系统架构。下面详细介绍三大核心模块和两大核心数据流。



  • 首先,通过规则配置端创建规则,采用性能保护配置端修改性能保护策略;

  • 然后,将任务所属的规则文件和性能保护策略文件一并推送到 Sabre-server 提供的 REST 接口,该接口会调用文件解析及优化方法构建规则有向无环图。

  • 接着,执行词法语法分析方法,将规则有向无环图中各个节点的 EPL 转换为与其对应的 AST(AbstractSyntax Tree,抽象语法树),再将 AST 翻译为任务 java 代码。

  • 最后,调用 maven 命令打包 java 代码为任务 jar 包,并将任务 jar 包及基础运行库一并提交到 Flink-on-YARN 集群。

 

Flink 有多种运行模式(例如 standalone Flink cluster、Flink cluster on YARN、Flink job on YARN 等),Sabre 采用了“Flink job on YARN”模式,在奇安信 NGSOC 应用的特定场景下,采用 YARN 可统一维护硬件资源,并且使用 Flink job on YARN 可与 Hadoop 平台进行无缝对接,以此很好的实现了任务间资源隔离。

 

在 Sabre 任务执行过程中,Kafka 数据源向引擎提供原始事件。引擎处理结果分为回注事件和告警事件两类。告警事件会输出到目的 Kafka,供下级应用消费。回注事件表示一条规则的处理结果可直接回注到下级规则,作为下级规则的数据源事件,由此可实现规则的相互引用。

 

绿线流程表示任务执行过程中会定时输出节点的运行监控消息到 Sabre-server 的监控消息缓存器,然后监控消息统计器再汇总各个规则实例的运行监控消息,统计为整条规则的运行监控状态,最后通过 Sabre-server 提供的 REST 接口推送给规则监控端。


技术架构



Sabre 的组件依赖与版本兼容情况如下图所示。


  • 大多数情况下,奇安信会以黑盒的方式发布产品,但是如果用户方已经部署大数据处理平台,则产品会以 APP 的方式提供使用。

  • 由于客户规模较大,项目种类较多,部署环境较为复杂,或者存在多种 Yarn 集群版本,或者 Sabre 需作为单一 Flink 应用发布到客户已部署的 Flink 集群。

  • 如何节省成本及提高实施效率,快速适配上述复杂的部署环境是个亟需解决的问题,为此 Sabre 的设计原则是仅采用 Flink 的分布式计算能力,业务代码尽可能减少对 API 层的依赖,以便于兼容多种 Flink 版本。


如图所示,Deploy、Core、APIs、Libraries 四层是大家熟知的 Flink 基本的组件栈。Sabre 对 API 层的依赖降到了最低,只引用了 DataStream、KeyedStream 和 SplitStream 三种数据流 API。函数依赖只包括 DataStream 的 assignTimestamps、flatMap、union、keyBy、split、process、addSink 等函数,KeyedStream 最基础的 process 函数,以及 SplitStream 的 select 函数。由于依赖的 Flink API 较少,Sabre 可以很容易适配到各个 Flink 版本,从而具有良好的 Flink 版本兼容性。



在算子方面,Sabre 对 Flink 进行了一系列的重构,下图展示了这 Flink 和 Sabre 这二者之间的对比关系,其中主要包含三列,即 Flink 原生算子、Sabre 算子和两者之间的比较结果。比较结果主要有四种情况,相同(Same)、实现(Implement)、优化(优化)和新增(New)。Sabre 共有 13 种完全自研的核心算子,其中 Datasource、CustomKafkaSink 和 CustomDatabase 按照 Flink 接口要求做了具体实现,Filter、Key、Join 和 Aggregation 按照 Flink 原有算子的语义做了重新实现,CustomWindow 和 Sequence 在 Flink 原有算子语义的基础上做了优化实现。


 

下图展示了 Sabre 的规则与 EPL 设计。序列 Sequence、聚合 Aggregation、不发生 NotOccur、流式机器学习 StreamML 和连接 Join 均属于 Window 执行时间包含的计算性算子。蓝色虚线表示引用动态数据(Dynamic data),紫色虚线表示 Filter 无须经过 Window 可直连输出组件。




Window 算子


众所周知,Join 和 Aggregation 的时间范围由 Window 限定,而 Flink 原有 Window 算子不适合网络安全监测需求,为此 Sabre 设计了一种“自定义 Window 算子”,且重新实现了与“自定义 Window 算子”相匹配的 Join 和 Aggregation 算子。全新的 Window 具有以下六个主要特点:


  • 实时触发、即刻匹配:其目的是为了满足自动化实时响应的需求,一旦告警发出,会及时触发响应;

  • 匹配不重复:重复告警对于规则引擎来讲是一个常见问题,大量重复告警会增加安全人员的工作量,而该算子会将整个窗口与告警相关的事件全部清空,以此减少重复告警的数量;

  • 纠正乱序:将 Window 窗口以特定单位为边界切成一个个的时间槽,一旦发现乱序情况,插入乱序事件时可直接定位时间槽,基于流式状态机进行局部计算,并且窗口事件超时,同步更新计算性算子的值,并入 count 算子,删除超时事件的同时,同步减少 count 值;

  • 实时资源和状态监控:由于 Window 对与内存和 CPU 的影响比较大,因此需要对该类资源进行特别监控以及保护;

  • 流量控制:主要是为了更好地保护下级应用。




Sequence 序列算子


Sabre 用 EPL 对 Flink CEP 实现的序列算子进行了重新设计,左边是 Flink CEP 官方代码展示,采用程序代码的方式拼凑“NFA 自动机”。右边是 Sabre 中 Sequence 算子的实现方式,其中包含了三个不同的 filter,通过正则表达式的使用来提升其表达的能力,并且,Sabre 将 filter 前置,无效事件不会传输到 window 算子,从而较少了不必要的网络负载。并且,只有较少的有效数据需要执行正则匹配,降低了 CPU 利用率(filter 可以并行)。




NotOccur 不发生算子


NotOccur 是 Sabre 在 Flink 基础上新增的一个算子,支持空事件触发。


 


Trigger 全局算子


Sabre 还实现了一种针对窗口的全局触发器 Trigger,Trigger 能够将多个子计算性算子组合为复杂表达式,并实现了具有 GroupBy/Distinct 功能的 Key 算子以适配此 Trigger 算子。


 


Dynamic Data


Dynamicdata 可以映射为数据库中的一个表,但是对这个表要进行特别的优化,具体来讲,如果一个事件的 IP 在威胁情报列表中,而这个威胁情报有可能比较长,比如十几万行甚至更长,这种情况下需要对该表数据结构进行优化以提升效率。Dynamic data 可以在其他算子中使用,如 Filter、Join 等。




流式统计与机器学习 StreamML

 


机器学习在网络异常检测上已经越来越重要,为适应实时检测的需求,Sabre 没有使用 Flink MachineLearning,而是引入了自研的流式机器学习算子 StreamML。


Flink MachineLearning 是一种基于批模式 DataSetApi 实现的机器学习函数库,而 StreamML 是一种流式的机器学习算子,其目的是为了满足网络安全监测的特定需求。与阿里巴巴开源的 Alink 相比,StreamML 允许机器学习算法工程师通过配置规则的方式即可快速验证算法模型,无需编写任何程序代码。并且,流式机器学习算子 StreamML 实现了“模型训练/更新”与“模型使用”统一的理念。其核心功能是通过算法、技术及模型实现数据训练及对新数据检测。该流式机器学习算子 StreamML 引入的输入有三类,分别是:事件流、检测对象和对象属性;输出也包含三类,分别是:事件、告警和预警。


 

流式机器学习算子 StreamML 的组件栈包含三部分,从下往上依次为:机器学习方法、应用场景和产品业务。通过基本的机器学习算法(比如:统计学习算法、序列分析算法、聚类分析算法),流式机器学习算子 StreamML 可满足具体特定的安全监测应用场景(比如:行为特征异常检测、时间序列异常检测、群组聚类分析),进而为用户提供可理解的产品业务(比如:基线、用户及实体行为分析 UEBA)。


  • 行为特征异常检测:根据采集的样本数据(长时间)对统计分析对象建立行为基线,并以此基线为准,检测发现偏离正常行为模式的行为。例如:该用户通常从哪里发起连接?哪个运营商?哪个国家?哪个地区?这个用户行为异常在组织内是否为常见异常?

  • 时间序列异常检测:根据某一个或多个统计属性,判断按时间顺序排列的数值序列是否异常,由此通过监测指标变化来发现安全事件。例如:监测某网站每小时的访问量以防止 DDOS 攻击;建模每个账号传输文件大小的平均值,检测出传输文件大小的平均值离群的账号。

  • 群组聚类分析:对数据的特征属性间潜在相关性进行挖掘,将具有类似特征值的数据进行分组聚类。例如:该用户是否拥有任何特殊特征?可执行权限/特权用户?基于执行的操作命令和可访问的实体,来识别IT管理员、DBA 和其它高权限用户。



因为采用了 Flink 作为底层运行组件,所以 Sabre 具有与 Flink 等同的执行性能。并且,针对网络安全监测领域的特定需求,Sabre 还在以下方面进行了性能优化:


  • 全局组件(数据源、动态表)引用优化。由于 Kafka 类型的数据源 topic 有限,而规则数量可动态扩展,导致多个规则会有极大概率共用同一个数据源,根据 EPL 语义等价原则合并相同的数据源,进而可以减少数据输入总量及线程总数。

  • 全新的匹配引擎。序列 Sequence 算子采用了新颖的流式状态机引擎,复用了状态机缓存的状态,提升了匹配速度。类似优化还包含大规模 IP 匹配引擎和大规模串匹配引擎。在流量、日志中存在大规模 IP 和字符串匹配需求,通过 IP 匹配引擎和大规模串匹配引擎进行优化以提高效率。

  • 表计算表达式优化。对于规则中引用的动态表,会根据表达式的具体特性构建其对应的最优计算数据结构,以避免扫描全表数据,进而确保了执行的时间复杂度为常量值。

  • 自定义流式 Window 算子。采用“时间槽”技术实现了乱序纠正功能,并具有可以实时输出无重复、无遗漏告警的特性。

  • 图上字段自动推导,优化事件结构。根据规则前后逻辑关系,推导出规则中标注使用的原始日志相关字段,无须输出所有字段,以此优化输出事件结构,减少了输出事件大小。

  • 图上数据分区自动推导,优化流拓扑。由于特定的功能需要,Window 往往会缓存大量数据,以致消耗较多内存。通过对全局窗口 Hash 优化,避免所有全局窗口都分配到同一个 Taskmanager 进程,由此提高了引擎整体内存的利用率。



上图是 Sabre 流式状态机引擎的表示,其主要负责的功能是序列匹配。图中左边是标准的正则引擎,通常的流程可以从 Pattern 到语法树到 NFA 再到 DFA,也可以从 Paterrn 直接到 NFA;图左下侧是一个正则表达式的 NFA 表示,右侧是该正则表达式的 DFA 表示,使用该 DFA 的时候进行了改进(如图中绿色线)。其目的是为了在出现乱序的时候提升处理性能,在乱序发生在正则表达式后半段的时候,该改进对于性能提升的效果最好。



大规模正则引擎主要使用了两种互补的方法(图上半侧和下半侧)。在将 NFA 转向 DFA 的时候,很多情况下是不成功的,这种情况下往往会生成 DFA 的半成品,称为Unfinished-DFA,第一种方法属于混合状态自动机,包含 NFA 和 DFA,其适用于Pattern 量少于 1000 的情况。而第二种方法适用于 Pattern 量大于 1000 甚至上万的情况,该方法中首先需要寻找锚点,再做匹配,以降低整体的时间复杂度。这两种方法相结合能够较好地解决大规模正则匹配的问题。

 

产品运维



多级规则


多级规则是产品运维的一个显著特点。如下图所示,为满足复杂场景需求,一种规则的输出可直接作为另一种规则的输入。通过这种规则拆分的方式,能分层构造较为复杂的“多级规则”。如:图中的“暴力探测”规则结果可以直接回注到下面的“登陆成功 ”规则,而无须额外的通信组件,由此实现更为复杂的“暴力破解”规则。


 


服务化/多租户/资源监控


产品采用微服务架构,使用多租户、多任务来满足多个规则引擎的使用场景,同时对资源进行了实时监控来保证系统的稳定运行。




规则级的状态/资源监控


规则级的状态和资源监控是非常重要的产品需求,产品采用分布式监控,提供三级分布式监控能力(用户、任务和规则),并支持吞吐量、EPS、CPU 和内存的监控。




整体系统保护


整体系统保护主要涉及两方面,即流量控制和自我保护。


  • 流量控制:为了增强 Sabre 引擎的健壮性,避免因规则配置错误,导致生成大量无效告警,在输出端做了流量控制,以更好地保护下级应用。当下级抗压能力较弱时(例如数据库),整个系统会做输出降级。

  • 自我保护:跑在 JVM 上的程序,经常会遇到由于长时间 Full GC 导致 OOM 的错误,并且此时 CPU 占用率往往非常高,Flink 同样存在上述问题。自我保护功能采用了同时兼顾“Window隶属规则的优先级”及“Window引用规则数量”两个条件的加权算法,以此根据全局规则语义实现自动推导 Window 优先级,并根据此优先级确定各个 Window 的自我保护顺序。实时监控 CPU 及内存占用,当超过一定阈值时,智能优化事件分布,以防出现 CPU 长期过高或内存使用率过大而导致的 OOM 问题。



 

未来发展与思考


未来基于 Flink 构建的 Sabre 引擎会持续优化产品性能与功能,并将总结凝练项目中的优秀实践,及时回馈给 Apache Flink 社区。



后台回复「实时数仓」可查看作者现场分享的视频~

猜你喜欢:
罕见罚单:一农商行因"数据治理存在严重缺陷"被罚
小米流式平台|实时数仓架构演进与实践
HBase调优及优化的10种方式
数据同步之道(Sqoop、dataX、Kettle、Canal、StreamSets)
基于Flink SQL构建实时数据仓库
菜鸟数据中台技术演进之路





数仓社区

如有收获,请划至底部,点击“在看”,谢谢!


资源下载

关注公众号:数据仓库与Python大数据 回复关键字获取哦

06,数仓经典书籍

07,  python基础入门

中台,中台 PPT

体系,OneData体系PPT

实时数仓FFA 实时数仓视频回顾

Kettle,Kettle视频

Kylin,Kylin视频

Flink,Oracle 12.2体系结构图

Python,零基础学Python教程视频

升职加薪

关注我们,获取更多 技术干货与福利哦
你也「在看」吗?👇

您可能也对以下帖子感兴趣

文章有问题?点此查看未经处理的缓存